package com.it.cloud.producer.threadWork;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * 生产者线程
 */
public class KafkaProducerThread implements Runnable {

    private KafkaProducer<String, String> producer = null;

    private ProducerRecord<String, String> record = null;

    public KafkaProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
        this.producer = producer;
        this.record = record;
    }

    @Override
    public void run() {
        producer.send(record, new Callback() {

            @Override
            public void onCompletion(RecordMetadata metaData,
                                     Exception exception) {
                if (null != exception) {// 发送异常记录异常信息
                    System.out.println("Send message exception:" + exception);
                }
                if (null != metaData) {
                    System.out.println(String.format("offset:%s,partition:%s",
                            metaData.offset(), metaData.partition()));
                }
            }

        });
    }

}
